在 Day 27,我們探討了 StringView 如何提升字串處理效能。這是一種「靜態」的優化——透過更好的數據表示來加速操作。今天,我們將探討另一個層面的優化:動態決策優化,也就是讓查詢優化器根據實際數據特徵來做出最佳選擇。
回顧前幾週的學習,我們在 Day 13 認識了優化器框架,知道 DataFusion 有許多優化規則。在 Day 20-21 學習 Join 算子時,我們也提到過一個關鍵問題:如何選擇 Build 側和 Probe 側?當時我們知道「應該讓小表作為 Build 側」,但優化器怎麼知道哪個表更小呢?
答案就在今天的主題:統計資訊(Statistics)與基於成本的優化(Cost-Based Optimization, CBO)。
在傳統數據庫(如 PostgreSQL、MySQL)中,統計資訊是查詢優化的核心。透過收集表的行數、欄位分佈、唯一值數量等資訊,優化器能夠估算不同執行計劃的成本,選擇最優方案。DataFusion 作為現代化的查詢引擎,同樣具備這樣的能力,並且在設計上更加靈活和可擴展。
今天我們將深入理解:
想像一個簡單的 JOIN 查詢:
SELECT *
FROM orders o
JOIN customers c ON o.customer_id = c.id;
優化器需要決定:
如果沒有統計資訊,優化器只能「猜測」:
盲目決策場景:
假設 orders 有 1000 萬行,customers 有 1 萬行
錯誤決策 1:用 orders 建立 Hash Table
  結果:需要 2GB 記憶體,可能觸發 Spilling
  
正確決策:用 customers 建立 Hash Table
  結果:只需 20MB 記憶體,完全在記憶體中完成
性能差異:3-5 倍!
有了統計資訊,優化器可以做出數據驅動的決策:
統計資訊驅動決策:
orders 表統計:
  - 行數:10,000,000
  - 平均行大小:200 bytes
  - 預估大小:2GB
customers 表統計:
  - 行數:10,000
  - 平均行大小:150 bytes
  - 預估大小:1.5MB
優化器計算:
  方案 A(orders 為 Build 側):
    - Build 階段記憶體:2GB(可能 Spill)
    - Probe 階段 I/O:讀取 1.5MB
    - 預估成本:高(Spilling 開銷)
    
  方案 B(customers 為 Build 側):✓
    - Build 階段記憶體:1.5MB(無需 Spill)
    - Probe 階段 I/O:讀取 2GB
    - 預估成本:低(全記憶體操作)
選擇:方案 B
統計資訊使優化器能夠:
DataFusion 的統計資訊定義在 datafusion-common/src/stats.rs 中:
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Statistics {
    /// 表的總行數(精確值或估算值)
    pub num_rows: Precision<usize>,
    
    /// 輸出數據的總字節數
    pub total_byte_size: Precision<usize>,
    
    /// 每個欄位的統計資訊
    pub column_statistics: Vec<ColumnStatistics>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColumnStatistics {
    /// 空值數量
    pub null_count: Precision<usize>,
    
    /// 最小值
    pub min_value: Precision<ScalarValue>,
    
    /// 最大值
    pub max_value: Precision<ScalarValue>,
    
    /// 總和值(用於估算 AVG 等聚合)
    pub sum_value: Precision<ScalarValue>,
    
    /// 唯一值數量(基數,Cardinality)
    pub distinct_count: Precision<usize>,
}
DataFusion 用 Precision 枚舉來表達統計資訊的精確程度:
pub enum Precision<T> {
    /// 精確值(例如從 Parquet 元數據讀取)
    Exact(T),
    
    /// 估算值(例如通過採樣或傳播計算)
    Inexact(T),
    
    /// 缺失值(沒有統計資訊)
    Absent,
}
設計理念:
統計資訊的精確度層次:
Exact(精確):
  來源:Parquet 元數據、實際計數
  可信度:100%
  例子:num_rows = Exact(1000000)
Inexact(估算):
  來源:統計傳播、基於規則的估算
  可信度:70-90%
  例子:過濾後行數 = Inexact(500000)  // 根據選擇性估算
Absent(缺失):
  來源:無法獲取統計
  可信度:0%
  例子:CSV 文件沒有內建統計
  處理:優化器使用預設假設
這種設計讓優化器能夠區分「確定的事實」和「有根據的猜測」,做出更穩健的決策。
Parquet 是最「統計友好」的格式,內建豐富的統計資訊:
Parquet 檔案結構:
┌─────────────────────────────────────┐
│         File Metadata               │
│  - 總行數:10,000,000               │
│  - Row Group 數量:100              │
└─────────────────────────────────────┘
           ↓
┌─────────────────────────────────────┐
│      Row Group 0 Metadata           │
│  - 行數:100,000                    │
│  - Column Chunks:                   │
│    ┌─────────────────────────────┐ │
│    │ Column "user_id"             │ │
│    │  - min: 1                    │ │
│    │  - max: 50000                │ │
│    │  - null_count: 0             │ │
│    │  - distinct_count: 45231 (*)│ │
│    └─────────────────────────────┘ │
│    ┌─────────────────────────────┐ │
│    │ Column "age"                 │ │
│    │  - min: 18                   │ │
│    │  - max: 95                   │ │
│    │  - null_count: 123           │ │
│    └─────────────────────────────┘ │
└─────────────────────────────────────┘
(*) distinct_count 需要額外計算,不是所有 Parquet 文件都有
DataFusion 如何利用:
// 從 Parquet 元數據提取統計資訊的實際實現
// 來源:datafusion/datasource-parquet/src/metadata.rs
pub fn statistics_from_parquet_metadata(
    metadata: &ParquetMetaData,
    table_schema: &SchemaRef,
) -> Result<Statistics> {
    let row_groups_metadata = metadata.row_groups();
    
    let mut statistics = Statistics::new_unknown(table_schema);
    let mut has_statistics = false;
    let mut num_rows = 0_usize;
    let mut total_byte_size = 0_usize;
    
    // 遍歷所有 Row Group
    for row_group_meta in row_groups_metadata {
        num_rows += row_group_meta.num_rows() as usize;
        total_byte_size += row_group_meta.total_byte_size() as usize;
        
        // 檢查是否有任何欄位包含統計資訊
        if !has_statistics {
            has_statistics = row_group_meta
                .columns()
                .iter()
                .any(|column| column.statistics().is_some());
        }
    }
    
    // 行數和字節數總是精確的
    statistics.num_rows = Precision::Exact(num_rows);
    statistics.total_byte_size = Precision::Exact(total_byte_size);
    
    // 如果有統計資訊,提取欄位級別的 min/max/null_count
    if has_statistics {
        // 聚合所有 Row Group 的欄位統計
        // 對於 min:取所有 Row Group 中的最小值
        // 對於 max:取所有 Row Group 中的最大值  
        // 對於 null_count:累加所有 Row Group 的空值數量
        statistics.column_statistics = aggregate_column_stats(
            row_groups_metadata, 
            table_schema
        );
    } else {
        // 沒有統計資訊時,返回未知狀態
        statistics.column_statistics = Statistics::unknown_column(table_schema);
    }
    
    Ok(statistics)
}
關鍵處理邏輯:
Precision::Exact),直接從 Row Group 元數據累加min_value:取所有 Row Group 的最小值中的最小值max_value:取所有 Row Group 的最大值中的最大值null_count:累加所有 Row Group 的空值數量null_count = Precision::Exact(num_rows))自定義數據源可以實現 TableProvider::statistics() 方法:
#[async_trait]
impl TableProvider for MyCustomTable {
    async fn scan(&self, ...) -> Result<Arc<dyn ExecutionPlan>> {
        // ... 掃描邏輯
    }
    
    fn statistics(&self) -> Option<Statistics> {
        Some(Statistics {
            num_rows: Precision::Exact(self.row_count),
            total_byte_size: Precision::Inexact(self.estimated_size),
            column_statistics: self.build_column_stats(),
        })
    }
}
實例:記憶體表(MemTable)
// MemTable 直接計算精確統計
impl TableProvider for MemTable {
    fn statistics(&self) -> Option<Statistics> {
        let num_rows = self.batches.iter()
            .map(|batch| batch.num_rows())
            .sum();
        
        Some(Statistics {
            num_rows: Precision::Exact(num_rows),
            // ... 其他統計
        })
    }
}
CSV 等格式沒有內建統計,DataFusion 會返回 Absent:
impl ExecutionPlan for CsvExec {
    fn statistics(&self) -> Result<Statistics> {
        Ok(Statistics {
            num_rows: Precision::Absent,  // 不知道行數
            total_byte_size: Precision::Inexact(file_size),  // 只知道文件大小
            column_statistics: vec![],
        })
    }
}
統計資訊不僅來自數據源,還會在查詢計劃樹中向上傳播和估算。
查詢:
SELECT user_id, COUNT(*) 
FROM orders 
WHERE amount > 100 
GROUP BY user_id;
計劃樹與統計傳播:
┌─────────────────────────────────────┐
│  AggregateExec                      │
│  統計估算:                          │
│    num_rows = Inexact(50,000)       │  ← 基於 distinct(user_id) 估算
│    (GROUP BY 的輸出行數 ≈ 分組數)   │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│  FilterExec (amount > 100)          │
│  統計估算:                          │
│    num_rows = Inexact(3,000,000)    │  ← 根據選擇性估算
│    (輸入 10M * 選擇性 0.3)          │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│  ParquetExec                        │
│  統計來源:文件元數據(精確)        │
│    num_rows = Exact(10,000,000)     │
│    min(amount) = 10                 │
│    max(amount) = 10000              │
└─────────────────────────────────────┘
選擇性(Selectivity):過濾條件通過的行數比例
估算公式示例:
條件:amount > 100
已知統計:
  - min(amount) = 10
  - max(amount) = 10000
  - 假設均勻分佈
選擇性估算:
  selectivity = (max - 100) / (max - min)
              = (10000 - 100) / (10000 - 10)
              = 9900 / 9990
              ≈ 0.99
輸出行數 = 10,000,000 * 0.99 = 9,900,000
注意:這是簡化的估算。實際上 DataFusion 使用更複雜的模型,考慮數據傾斜、直方圖(Histogram)等。
Join 的輸出行數估算更複雜:
場景:orders JOIN customers ON orders.customer_id = customers.id
已知統計:
  orders.num_rows = 10,000,000
  customers.num_rows = 100,000
  distinct(orders.customer_id) = 80,000  ← 關鍵!
  distinct(customers.id) = 100,000
估算邏輯:
  - 如果是 INNER JOIN,輸出行數取決於外鍵關係
  - 假設每個 customer 平均有:10,000,000 / 80,000 ≈ 125 個訂單
  - 估算輸出:10,000,000 行(每個訂單對應一個客戶)
如果沒有 distinct_count:
  - 退化為簡單估算:min(orders.num_rows, customers.num_rows)
  - 或使用預設選擇性(如 0.1)
當有多個表 JOIN 時,順序很重要:
SELECT *
FROM large_table l
JOIN medium_table m ON l.m_id = m.id
JOIN small_table s ON m.s_id = s.id;
優化器決策過程:
原始順序:large → medium → small
統計資訊:
  large_table:  10,000,000 rows
  medium_table:    100,000 rows
  small_table:      1,000 rows
成本計算:
方案 A(原始順序):
  Step 1: large JOIN medium
    - Build: medium (100K rows → 20MB)
    - Probe: large (10M rows)
    - 中間結果:~8,000,000 rows(假設選擇性 0.8)
  
  Step 2: 中間結果 JOIN small
    - Build: small (1K rows → 200KB)
    - Probe: 中間結果 (8M rows)
    - 最終結果:6,000,000 rows
  
  總成本:Build(20MB) + Probe(10M) + Build(200KB) + Probe(8M) = 高
方案 B(優化後):medium → small → large
  Step 1: medium JOIN small
    - Build: small (1K rows → 200KB)
    - Probe: medium (100K rows)
    - 中間結果:80,000 rows
  
  Step 2: large JOIN 中間結果
    - Build: 中間結果 (80K rows → 16MB)
    - Probe: large (10M rows)
    - 最終結果:6,000,000 rows
  
  總成本:Build(200KB) + Probe(100K) + Build(16MB) + Probe(10M) = 低
選擇:方案 B(先 JOIN 小表)
實現提示:DataFusion 的 JoinSelection 優化規則會利用統計資訊來重新排列 JOIN 順序。
對於單個 Hash Join,DataFusion 的 JoinSelection 優化規則會智能選擇 Build 和 Probe 側。
實際邏輯(來自 datafusion/physical-optimizer/src/join_selection.rs):
/// 檢查是否應該交換 Join 的左右順序
/// 返回 true 表示應該交換(讓右側作為 Build 側)
pub(crate) fn should_swap_join_order(
    left: &dyn ExecutionPlan,
    right: &dyn ExecutionPlan,
) -> Result<bool> {
    let left_stats = left.partition_statistics(None)?;
    let right_stats = right.partition_statistics(None)?;
    
    // 優先使用 total_byte_size 來比較
    match (
        left_stats.total_byte_size.get_value(),
        right_stats.total_byte_size.get_value(),
    ) {
        (Some(l), Some(r)) => {
            // 如果左側字節數 > 右側,交換(讓小的右側作為 Build)
            return Ok(l > r);
        }
        _ => {
            // total_byte_size 不可用時,退化使用 num_rows
        }
    }
    
    // Fallback:使用行數比較
    match (
        left_stats.num_rows.get_value(),
        right_stats.num_rows.get_value(),
    ) {
        (Some(l), Some(r)) => {
            // 如果左側行數 > 右側,交換
            Ok(l > r)
        }
        _ => {
            // 完全沒有統計資訊,不交換
            Ok(false)
        }
    }
}
決策優先級:
1. total_byte_size(優先)
   ↓ 理由:更準確反映記憶體使用
   ↓ 例如:100 萬行 × 每行 1KB = 1GB
   ↓      vs 1000 萬行 × 每行 10B = 100MB
   ↓      行數多不代表佔用大
   
2. num_rows(備選)
   ↓ 理由:當沒有字節數統計時使用
   ↓ 假設:行大小相似
   
3. 保持原樣(無統計)
   ↓ 理由:避免盲目交換導致性能下降
配置閾值:
DataFusion 還考慮了「單分區收集」的閾值:
// 判斷是否可以將右側完全收集到單個分區(CollectLeft 模式)
fn supports_collect_by_thresholds(
    plan: &dyn ExecutionPlan,
    threshold_byte_size: usize,  // 預設: hash_join_single_partition_threshold
    threshold_num_rows: usize,   // 預設: hash_join_single_partition_threshold_rows
) -> bool {
    let stats = plan.partition_statistics(None);
    
    // 優先檢查字節數
    if let Some(byte_size) = stats.total_byte_size.get_value() {
        *byte_size != 0 && *byte_size < threshold_byte_size
    } 
    // 退化檢查行數
    else if let Some(num_rows) = stats.num_rows.get_value() {
        *num_rows != 0 && *num_rows < threshold_num_rows
    } 
    else {
        false
    }
}
這確保了只有當右側足夠小時,才會使用 CollectLeft 模式(將右側收集到單個分區)。
聚合的輸出行數取決於 GROUP BY 的基數:
SELECT user_id, COUNT(*) 
FROM orders 
GROUP BY user_id;
估算:
已知:distinct(user_id) = 50,000
輸出行數 = distinct(user_id) = 50,000
如果沒有 distinct_count:
  - 使用啟發式:min(num_rows, num_rows / 10)
  - 假設平均每組 10 行
這個估算影響下游算子的記憶體分配和並行度決策。
在某些情況下,你可能比引擎更了解數據特徵,可以手動提供統計資訊。
假設你實現了一個連接外部 API 的 TableProvider:
struct ApiTableProvider {
    schema: SchemaRef,
    api_client: ApiClient,
    // 你知道 API 大約返回多少行
    estimated_row_count: usize,
}
#[async_trait]
impl TableProvider for ApiTableProvider {
    async fn scan(...) -> Result<Arc<dyn ExecutionPlan>> {
        Ok(Arc::new(ApiExec {
            // ...
        }))
    }
    
    fn statistics(&self) -> Option<Statistics> {
        Some(Statistics {
            // 提供估算的行數,幫助優化器
            num_rows: Precision::Inexact(self.estimated_row_count),
            total_byte_size: Precision::Inexact(
                self.estimated_row_count * 200  // 假設每行 200 字節
            ),
            column_statistics: vec![
                // 如果知道某些欄位的範圍,也可以提供
                ColumnStatistics {
                    null_count: Precision::Absent,
                    min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
                    max_value: Precision::Exact(ScalarValue::Int32(Some(1000000))),
                    distinct_count: Precision::Inexact(50000),
                },
                // ... 其他欄位
            ],
        })
    }
}
使用 EXPLAIN 可以看到統計資訊如何影響計劃:
-- 假設我們有兩個表,一個有統計,一個沒有
CREATE EXTERNAL TABLE orders_with_stats 
STORED AS PARQUET 
LOCATION '/path/to/orders.parquet';  -- Parquet 有統計
CREATE EXTERNAL TABLE users_no_stats 
STORED AS CSV 
LOCATION '/path/to/users.csv';  -- CSV 無統計
-- 查看 JOIN 計劃
EXPLAIN SELECT * 
FROM orders_with_stats o 
JOIN users_no_stats u ON o.user_id = u.id;
可能的輸出:
PhysicalPlan:
  HashJoinExec: mode=CollectLeft, on=[(user_id, id)]
    ├── CsvExec: file=/path/to/users.csv           ← Build 側(默認選擇)
    └── ParquetExec: file=/path/to/orders.parquet  ← Probe 側
說明:由於 users 沒有統計,優化器無法確定大小,
      可能做出次優選擇(應該 orders 更大,但引擎不確定)
如果為 CSV 提供統計:
// 包裝 CsvExec,提供統計
struct CsvWithStats {
    inner: Arc<CsvExec>,
    stats: Statistics,
}
impl ExecutionPlan for CsvWithStats {
    fn statistics(&self) -> Result<Statistics> {
        Ok(self.stats.clone())  // 返回我們提供的統計
    }
    
    // 其他方法委託給 inner...
}
再次 EXPLAIN 可能會看到優化後的計劃:
PhysicalPlan:
  HashJoinExec: mode=CollectLeft, on=[(user_id, id)]
    ├── CsvExec: file=/path/to/users.csv (stats: 100K rows)  ← Build 側(小表)
    └── ParquetExec: file=/path/to/orders.parquet (stats: 10M rows)  ← Probe 側(大表)
優化器現在知道 users 只有 10 萬行,orders 有 1000 萬行,
正確選擇 users 作為 Build 側!
DataFusion 提供了多個與統計資訊相關的配置選項:
-- 設定單分區 Hash Join 的字節數閾值(預設 128MB)
SET datafusion.optimizer.hash_join_single_partition_threshold = 134217728;
-- 設定單分區 Hash Join 的行數閾值(預設 131072)
SET datafusion.optimizer.hash_join_single_partition_threshold_rows = 131072;
-- 是否優先使用 Hash Join(預設 true)
SET datafusion.optimizer.prefer_hash_join = true;
作用:當右側表的大小(字節數或行數)小於閾值時,使用 CollectLeft 模式將右側收集到單個分區,避免分區間的數據重分配開銷。
-- 是否啟用 Parquet 統計資訊(預設 true)
SET datafusion.execution.parquet.pruning = true;
-- 是否啟用 Page Index 進行更精細的過濾(預設 true)
SET datafusion.execution.parquet.enable_page_index = true;
-- 是否啟用 Bloom Filter 進行點查詢優化(預設 true)
SET datafusion.execution.parquet.bloom_filter_on_read = true;
作用:這些配置控制 DataFusion 如何利用 Parquet 檔案的統計資訊進行謂詞下推和數據剪枝。
使用 EXPLAIN VERBOSE 可以看到統計資訊的影響:
-- 顯示詳細的執行計劃和統計資訊
EXPLAIN VERBOSE SELECT * FROM orders o JOIN customers c ON o.customer_id = c.id;
輸出可能包含:
PhysicalPlan:
  HashJoinExec: mode=CollectLeft, on=[(customer_id, id)]
    stats=[num_rows=10000000, total_byte_size=2000000000]
    ├── ParquetExec: file=customers.parquet
    │   stats=[num_rows=100000, total_byte_size=15000000]  ← 右側較小
    └── ParquetExec: file=orders.parquet
        stats=[num_rows=10000000, total_byte_size=2000000000]
今天我們深入探討了 DataFusion 的統計資訊與基於成本的優化:
統計資訊的價值:讓優化器從「盲目猜測」變為「數據驅動決策」,在 Join 策略選擇、Join 順序優化、記憶體管理等方面帶來顯著性能提升
Statistics 結構設計:透過 num_rows、total_byte_size、column_statistics 描述數據特徵,用 Precision 枚舉區分精確值、估算值和缺失值
統計資訊來源:
Cost-Based 決策:
實戰技巧:
統計資訊是現代查詢引擎的「眼睛」,讓優化器能夠「看見」數據的真實面貌,做出明智的選擇。隨著 DataFusion 的演進,統計能力將更加豐富和智能,為查詢性能帶來持續的提升。
前面介紹了這麼多內容,對這個開源專案應該有更深的認識了,明天最後一天了,就來介紹如何貢獻 DataFusion 吧!